package com.tpvlog.rpc.protocol.handler;

import com.tpvlog.rpc.core.RpcRequest;
import com.tpvlog.rpc.core.RpcResponse;
import com.tpvlog.rpc.core.utils.RpcServiceHelper;
import com.tpvlog.rpc.protocol.MsgHeader;
import com.tpvlog.rpc.protocol.MsgStatus;
import com.tpvlog.rpc.protocol.MsgType;
import com.tpvlog.rpc.protocol.RpcProtocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cglib.reflect.FastClass;

import java.util.Map;

/**
 * RPC请求处理Handler
 */
public class RpcRequestHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {

    private static final Logger LOG = LoggerFactory.getLogger(RpcRequestHandler.class );

    private final Map<String, Object> rpcServiceMap;

    public RpcRequestHandler(Map<String, Object> rpcServiceMap) {
        this.rpcServiceMap = rpcServiceMap;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> protocol) {

        // 异步处理RPC请求
        RpcRequestProcessor.submitRequest(() -> {
            // 封装响应
            RpcProtocol<RpcResponse> resProtocol = new RpcProtocol<>();

            MsgHeader header = protocol.getHeader();
            header.setMsgType(MsgType.RESPONSE.getType().byteValue());
            resProtocol.setHeader(header);

            RpcResponse response = new RpcResponse();
            resProtocol.setBody(response);

            try {
                // 处理请求（同步调用）
                Object result = handle(protocol.getBody());
                response.setData(result);
                header.setStatus(MsgStatus.SUCCESS.getCode().byteValue());
            } catch (Throwable throwable) {
                header.setStatus( MsgStatus.FAIL.getCode().byteValue());
                response.setMessage(throwable.toString());
                LOG.error("process request {} error", header.getRequestId(), throwable);
            }
            ctx.writeAndFlush(resProtocol);
        });
    }

    /**
     * RPC请求处理
     */
    private Object handle(RpcRequest request) throws Throwable {
        String serviceKey = RpcServiceHelper.buildServiceKey(request.getClassName(), request.getServiceVersion());
        // 获取服务类
        Object serviceBean = rpcServiceMap.get(serviceKey);

        if (serviceBean == null) {
            throw new RuntimeException(String.format("service not exist: %s:%s", request.getClassName(), request.getMethodName()));
        }

        // 通过CGLIB的FastClass机制，执行方法
        Class<?> serviceClass = serviceBean.getClass();
        String methodName = request.getMethodName();
        Class<?>[] parameterTypes = request.getParameterTypes();
        Object[] parameters = request.getParams();

        FastClass fastClass = FastClass.create(serviceClass);
        int methodIndex = fastClass.getIndex(methodName, parameterTypes);
        return fastClass.invoke(methodIndex, serviceBean, parameters);
    }
}
